package com.isyscore.os.metadata.kettle.step;

import com.isyscore.os.metadata.enums.KettleDataFlowNodeType;
import com.isyscore.os.metadata.kettle.base.AbstractStep;
import com.isyscore.os.metadata.kettle.base.FlowConfig;
import com.isyscore.os.metadata.kettle.base.FlowHup;
import com.isyscore.os.metadata.kettle.vis.*;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.plugins.PluginInterface;
import org.pentaho.di.core.plugins.PluginRegistry;
import org.pentaho.di.core.plugins.StepPluginType;
import org.pentaho.di.trans.TransHopMeta;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepMeta;
import org.pentaho.di.trans.step.StepMetaInterface;
import org.pentaho.di.trans.step.errorhandling.StreamInterface;
import org.pentaho.di.trans.steps.mergejoin.MergeJoinMeta;
import org.pentaho.di.trans.steps.mergerows.MergeRows;
import org.pentaho.di.trans.steps.mergerows.MergeRowsMeta;

import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

public class MergeRowsStep extends AbstractStep {

    @Override
    public StepMeta decode(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {
        MergeRowsNode node = (MergeRowsNode) step;
        PluginRegistry registry = PluginRegistry.getInstance();
        PluginInterface sp = registry.findPluginWithId(StepPluginType.class, KettleDataFlowNodeType.MergeRows.name());
        StepMetaInterface stepMetaInterface = (StepMetaInterface) registry.loadClass(sp);

        MergeRowsMeta mergeRowsMeta = (MergeRowsMeta) stepMetaInterface;

        List<StreamInterface> infoStreams = mergeRowsMeta.getStepIOMeta().getInfoStreams();
        StreamInterface referenceStream = infoStreams.get(0);
        StreamInterface compareStream = infoStreams.get(1);


        StepMeta one = new StepMeta();
        one.setName(node.getOldStepName());
        referenceStream.setStepMeta(one);

        StepMeta two = new StepMeta();
        two.setName(node.getNewStepName());
        compareStream.setStepMeta(two);

        mergeRowsMeta.setFlagField(node.getFlagField());


        String[] keyFields1 = new String[node.getCompareFileds().size()];
        String[] keyFields2 = new String[node.getDataFileds().size()];

        for (int i = 0; i < node.getCompareFileds().size(); i++) {
            String filed = node.getCompareFileds().get(i);
            keyFields1[i] = filed;
        }

        for (int i = 0; i < node.getDataFileds().size(); i++) {
            String filed = node.getDataFileds().get(i);
            keyFields2[i] = filed;
        }

        mergeRowsMeta.setKeyFields(keyFields1);

        mergeRowsMeta.setValueFields(keyFields2);


        StepMeta stepMeta = new StepMeta(KettleDataFlowNodeType.MergeRows.name(), node.getNodeName(), stepMetaInterface);
        stepMeta.setDraw(true);
//        transMeta.addStep(stepMeta);
        return stepMeta;
    }

    @Override
    public StepMeta after(VisNode step, List<DatabaseMeta> databases, TransMeta transMeta, FlowConfig transGraph) throws Exception {

        MergeRowsNode mergeRowsNode = (MergeRowsNode) step;
        List<FlowHup> transHups = transGraph.getTransHups();
        List<FlowHup> newTransHups = new ArrayList<>();

        List<String> nodes = new ArrayList<>();


        newTransHups.add(new FlowHup(mergeRowsNode.getOldStepName()+"-默认排序", step.getNodeName()));
        newTransHups.add(new FlowHup(mergeRowsNode.getOldStepName(), mergeRowsNode.getOldStepName()+"-默认排序"));
        nodes.add( mergeRowsNode.getOldStepName()+"-默认排序");

        newTransHups.add(new FlowHup( mergeRowsNode.getNewStepName()+"-默认排序", step.getNodeName()));
        newTransHups.add(new FlowHup(mergeRowsNode.getNewStepName(), mergeRowsNode.getNewStepName()+"-默认排序"));
        nodes.add(mergeRowsNode.getNewStepName()+"-默认排序");

        for (FlowHup transHup : transHups) {
            if (!transHup.getTo().equals(step.getNodeName())) {
                newTransHups.add(transHup);
            }
        }


        mergeRowsNode.setOldStepName(mergeRowsNode.getOldStepName()+"-默认排序");
        mergeRowsNode.setNewStepName(mergeRowsNode.getNewStepName()+"-默认排序");
        StepMeta mjStep = decode(mergeRowsNode, databases, transMeta, transGraph);
        updateStep(transMeta, mergeRowsNode.getNodeName(),mjStep);


        String oldNode = nodes.get(0);
        SortRowsStep sortRowsStepOne = new SortRowsStep();
        SortRowsNode visNodeOne = new SortRowsNode();
        visNodeOne.setNodeName(oldNode);
        List<SortField> oneF = mergeRowsNode.getCompareFileds().stream().map(f -> new SortField(f, true)).collect(Collectors.toList());
        visNodeOne.setSortFields(oneF);
        StepMeta sortMetaOne = sortRowsStepOne.decode(visNodeOne, databases, transMeta, transGraph);
        sortMetaOne.setParentTransMeta(transMeta);
        transMeta.getSteps().add(sortMetaOne);


        String newNode = nodes.get(1);
        SortRowsStep sortRowsStepTwo = new SortRowsStep();
        SortRowsNode visNodeTwo = new SortRowsNode();
        visNodeTwo.setNodeName(newNode);
        List<SortField> twoF = mergeRowsNode.getCompareFileds().stream().map(f -> new SortField(f, true)).collect(Collectors.toList());
        visNodeTwo.setSortFields(twoF);
        StepMeta sortMetaTwo = sortRowsStepTwo.decode(visNodeTwo, databases, transMeta, transGraph);

        transMeta.getSteps().add(sortMetaTwo);
        List<TransHopMeta> hopMetas = newTransHups.stream().map(h -> new TransHopMeta(getStep(transMeta, h.getFrom()), getStep(transMeta, h.getTo()))).collect(Collectors.toList());
        sortMetaTwo.setParentTransMeta(transMeta);
        transMeta.setTransHops(hopMetas);

        transGraph.setTransHups(hopMetas.stream().map(h -> new FlowHup(h.getFromStep().getName(), h.getToStep().getName())).collect(Collectors.toList()));
        return null;
    }
}
